package com.proxy.server.frontend.handler.execute;

import com.proxy.common.constant.IsolationType;
import com.proxy.common.constant.PacketType;
import com.proxy.common.exception.ProxyIoException;
import com.proxy.common.packet.BinaryPacket;
import com.proxy.common.packet.CommandPacket;
import com.proxy.common.packet.ErrorPacket;
import com.proxy.common.utils.CharsetUtil;
import com.proxy.server.backend.Trigger;
import com.proxy.server.backend.TriggerBase;
import com.proxy.server.backend.nio.NioBackendConnection;
import com.proxy.server.context.AppContext;
import com.proxy.server.frontend.FrontendConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by liufish on 16/12/21.
 */
public class NioSingleExecuteX {


    private static final Logger logger = LogManager.getLogger(NioSingleExecuteX.class);

    public void execute(final FrontendConnection frontendConnection,
                        final NioBackendConnection backendConnection,
                        BinaryPacket bin,
                        CommandPacket commandPacket) {

        try {

            Future charsetFuture = AppContext.getInstance().getProxyExecutor().submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    _checkCharset(frontendConnection, backendConnection);
                    return null;
                }
            });

            charsetFuture.get();


            Future txIsolationFuture = AppContext.getInstance().getProxyExecutor().submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    _checkTxIsolation(frontendConnection, backendConnection);
                    return null;
                }
            });

            txIsolationFuture.get();


            Future autocommitFuture = AppContext.getInstance().getProxyExecutor().submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    _checkAutocommit(frontendConnection, backendConnection);
                    return null;
                }
            });

            autocommitFuture.get();


            Trigger trigger = new Trigger() {
                AtomicInteger packetId = new AtomicInteger(0);

                @Override
                public void onSuccess(BinaryPacket bin) {
                    bin.packetId = packetId.incrementAndGet();
                    //frontendConnection.write(bin);
                    logger.debug("onSuccess....");
                }

                @Override
                public void onError(BinaryPacket bin) {
                    bin.packetId = packetId.incrementAndGet();
                    //frontendConnection.write(bin);
                }

                @Override
                public void onHeadFieldsEof(BinaryPacket head, List<BinaryPacket> fields, BinaryPacket eof) {
                    head.packetId = packetId.incrementAndGet();
                    //frontendConnection.write(head);
                    for (BinaryPacket bin : fields) {
                        bin.packetId = packetId.incrementAndGet();
                        //frontendConnection.write(bin);
                    }
                    eof.packetId = packetId.incrementAndGet();
                   // frontendConnection.write(eof);
                    logger.debug("onHeadFieldsEof...." + backendConnection.hashCode());
                }

                @Override
                public void onRow(BinaryPacket row) {
                    row.packetId = packetId.incrementAndGet();
                    //frontendConnection.write(row);

                }

                @Override
                public void onRowsEof(BinaryPacket bin) {
                    bin.packetId = packetId.incrementAndGet();
                    //frontendConnection.write(bin);
                    logger.debug("onRowsEof...." + backendConnection.hashCode());
                }
            };
            //每次都是新的事件处理
            backendConnection.getReceiver().setTrigger(trigger);
            backendConnection.write(commandPacket);


            logger.debug("结束 xxx   execute" + backendConnection.hashCode());

        } catch (Exception ex) {
            throw new ProxyIoException(ex);
        }
    }


    private void _checkCharset(final FrontendConnection frontendConnection, final NioBackendConnection backendConnection) throws Exception {

        //如果前后端字符集不一致
        if (frontendConnection.getCharsetIndex() != backendConnection.getCharsetIndex()) {

            //发送前端字符集给后端连接
            String charset = CharsetUtil.getDbCharset(frontendConnection.getCharsetIndex());
            StringBuilder builder = new StringBuilder();
            builder.append("SET NAMES ").append(charset);
            CommandPacket cmd = new CommandPacket();
            cmd.packetId = 0;
            cmd.packetType = PacketType.COM_QUERY;
            cmd.data = builder.toString().getBytes();

            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {

                    backendConnection.setCharsetIndex(frontendConnection.getCharsetIndex());
                    backendConnection.setCharset(frontendConnection.getCharset());
                    backendConnection.setDbCharset(frontendConnection.getDbCharset());

                }

                @Override
                public void onError(BinaryPacket bin) {
                    ErrorPacket err = new ErrorPacket();
                    err.read(bin);
                    throw new ProxyIoException("async charset error");

                }
            };
            backendConnection.getReceiver().setTrigger(trigger);
            backendConnection.write(cmd);

            logger.debug("结束   _checkCharset");
        }
    }


    private void _checkAutocommit(final FrontendConnection frontendConnection, final NioBackendConnection backendConnection) throws Exception {

        if (frontendConnection.isAutoCommit() != backendConnection.isAutoCommit()) {

            CommandPacket autoCommitPacket = new CommandPacket();
            autoCommitPacket.packetId = 0;
            autoCommitPacket.packetType = PacketType.COM_QUERY;
            byte[] data = null;
            if (frontendConnection.isAutoCommit()) {
                data = "SET autocommit=1".getBytes();
            } else {
                data = "SET autocommit=0".getBytes();
            }
            autoCommitPacket.data = data;

            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {

                    backendConnection.setAutoCommit(frontendConnection.isAutoCommit());


                }

                @Override
                public void onError(BinaryPacket bin) {

                    throw new ProxyIoException("async autocommit error");


                }
            };

            backendConnection.getReceiver().setTrigger(trigger);
            backendConnection.write(autoCommitPacket);

            logger.debug("结束   _checkAutocommit");
        }
    }


    private void _checkTxIsolation(final FrontendConnection frontendConnection, final NioBackendConnection backendConnection) throws Exception {

        if (frontendConnection.getTxIsolation() != backendConnection.getTxIsolation()) {

            CommandPacket txPacket = new CommandPacket();
            txPacket.packetId = 0;
            txPacket.packetType = PacketType.COM_QUERY;
            byte[] data = null;
            switch (frontendConnection.getTxIsolation()) {
                case IsolationType.READ_UNCOMMITTED:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED".getBytes();
                    break;
                case IsolationType.READ_COMMITTED:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED".getBytes();
                    break;
                case IsolationType.REPEATED_READ:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ".getBytes();
                    break;
                case IsolationType.SERIALIZABLE:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE".getBytes();
                    break;
                default:
                    throw new RuntimeException("txIsolation:" + frontendConnection.getTxIsolation());

            }
            txPacket.data = data;

            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {

                    backendConnection.setTxIsolation(frontendConnection.getTxIsolation());


                }

                @Override
                public void onError(BinaryPacket bin) {

                    throw new ProxyIoException("async txIsolation error");
                }
            };
            backendConnection.getReceiver().setTrigger(trigger);
            backendConnection.write(txPacket);

            logger.debug("结束   _checkTxIsolation");
        }

    }

}
